package com.ld.shieldsb.canalclient.handler.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.apache.solr.client.solrj.SolrClient;

import com.alibaba.google.common.collect.ImmutableMap;
import com.ld.shieldsb.canalclient.etl.EtlConsumer;
import com.ld.shieldsb.canalclient.handler.ICanalDataSyncHandler;
import com.ld.shieldsb.canalclient.handler.config.AdapterConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.handler.config.OuterAdapterConfig;
import com.ld.shieldsb.canalclient.handler.impl.solr.SolrEtlService;
import com.ld.shieldsb.canalclient.handler.impl.solr.SolrSyncService;
import com.ld.shieldsb.canalclient.model.Dml;
import com.ld.shieldsb.canalclient.model.EtlConditions;
import com.ld.shieldsb.canalclient.recoder.Recorder;
import com.ld.shieldsb.common.core.collections.ListUtils;
import com.ld.shieldsb.common.core.model.Result;
import com.ld.shieldsb.common.core.util.ResultUtil;
import com.ld.shieldsb.common.core.util.StringUtils;
import com.ld.shieldsb.common.core.util.solr.SolrUtil;

import lombok.extern.slf4j.Slf4j;

/**
 * 更新数据到solr
 * 
 * @ClassName Date2SolrHandler
 * @author <a href="mailto:donggongai@126.com" target="_blank">吕凯</a>
 * @date 2021年11月13日 下午3:52:57
 *
 */
@Slf4j
public class Data2SolrHandler implements ICanalDataSyncHandler {
    public static final String TYPE_NAME = "solr";
    private Map<String, Map<String, MappingConfig>> mappingConfigCache;
    private SolrSyncService solrSyncService;
    private boolean inited = false; // 初始化字段标识
    private String key; // 唯一标识
    private Map<String, SolrClient> solrClientMap = new HashMap<>(); // solrClient的map

    public static final String PROPERTIES_DESP = "url：solr的根url（不需要到core）;\n username：solr的用户名（没有不填）;\n password：solr的密码（没有不填）;"; // OuterAdapterConfig中properties参数说明，必须有

    @Override
    public void sync(List<Dml> dmls, Recorder recorder) {
        if (dmls == null || dmls.isEmpty()) {
            return;
        }
        try {
            solrSyncService.sync(mappingConfigCache, dmls, recorder, key);
        } catch (Exception e) {
            log.error("[" + key + "]Solr存储数据处理失败！！！", e);
        }
    }

    @Override
    public void init(OuterAdapterConfig configuration) {
        key = configuration.getKey();
        if (log.isWarnEnabled()) {
            log.warn("Solr处理器[key:{}]初始化", configuration.getKey());
        }
        // 初始化客户端连接池
        Map<String, String> properties = configuration.getProperties();
        String url = properties.get("url");
        String userName = properties.get("username");
        String password = properties.get("password");

        mappingConfigCache = new ConcurrentHashMap<>(); // 库名-表名对应配置

        List<AdapterConfig> configs = configuration.getConfigs();
        if (ListUtils.isNotEmpty(configs)) {
            for (AdapterConfig config : configs) {
                if (config instanceof MappingConfig) { // 类型判断

                    MappingConfig mc = (MappingConfig) config;
                    String sourceDatabase = mc.getDbMapping().getDatabase();
                    String sourceTable = mc.getDbMapping().getTable();
                    String targetTable = mc.getDbMapping().getTargetTable();

                    SolrClient solrClient = SolrUtil.getSolrClient(url + "/" + targetTable, userName, password);
                    solrClientMap.put(sourceTable, solrClient);
                    mappingConfigCache.put(mc.getDestination() + "_" + sourceDatabase + "-" + sourceTable, ImmutableMap.of("rdb", mc));
                }
            }
        }

        solrSyncService = new SolrSyncService(solrClientMap);
        inited = true;
    }

    @Override
    public void destroy() {
        if (solrSyncService != null) {
            solrSyncService.close();
        }
    }

    @Override
    public String getType() {
        return TYPE_NAME;
    }

    @Override
    public boolean getInit() {
        return inited;
    }

    @Override
    public Result testConn() {
        return ResultUtil.success("");
    }

    @Override
    public String getKey() {
        return key;
    }

    @Override
    public Result syncSingle(Dml dml, int dataIndex, String targetDb, String targetTable, Recorder recorder) {
        return solrSyncService.syncSingle(mappingConfigCache, key, dml, dataIndex, targetDb, targetTable);
    }

    /**
     * ETL方法
     *
     * @param task
     *            任务名, 对应配置名,任务名对应配置文件名 mytest_user.yml
     * @param params
     *            etl筛选条件
     * @return ETL结果
     */
    @Override
    public Result etl(String destination, String database, String table, String targetDb, String targetTable, EtlConditions etlConditions,
            Consumer<EtlConsumer> con) {
        Result etlResult = new Result();
        MappingConfig config = null;

        Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "_" + database + "-" + table);

        if (configMap == null || configMap.values().isEmpty()) {
            return ResultUtil.error("配置参数为空！");
        }
        String etlCondition = etlConditions.getEtlCondition();
        String etlDelta = etlConditions.getEtlDelta();
        List<String> params = etlConditions.getParams();
        for (MappingConfig configM : configMap.values()) {
            DbMapping dbMapping = configM.getDbMapping();
            String mappingTargetTable = (dbMapping.getTargetTable() == null ? "" : dbMapping.getTargetTable());
            String mappingTargetDb = (dbMapping.getTargetDb() == null ? "" : dbMapping.getTargetDb());
            if (mappingTargetDb.equalsIgnoreCase(targetDb) && mappingTargetTable.equalsIgnoreCase(targetTable)) {
                config = configM;
                // 覆盖etl的条件,参数为空说明不需要传参，那么就是没有附件条件，不修改
                if ((StringUtils.isNotEmpty(etlDelta) || StringUtils.isNotEmpty(etlCondition))) {
                    dbMapping.setEtlCondition(etlCondition);
                    dbMapping.setEtlDelta(etlDelta);
                }
            }
        }
        SolrClient solrClient = solrClientMap.get(table);
        SolrEtlService etlService = new SolrEtlService(solrClient, config);
        if (config != null) {
            if (StringUtils.isNotEmpty(etlConditions.getDataSourceKey())) {
                config.setDataSourceKey(etlConditions.getDataSourceKey());
            }
            return etlService.importData(params, etlConditions.getType(), con);
        }
        etlResult.setSuccess(false);
        etlResult.setErrorMessage("任务未找到！");
        return etlResult;
    }

}
